/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.zebra.types;

import org.apache.pig.data.Tuple;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.Map;
import java.util.ArrayList;
import java.util.Iterator;
import java.io.StringReader;
import org.apache.hadoop.io.BytesWritable;
import java.io.IOException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.hadoop.zebra.schema.Schema;
import org.apache.hadoop.zebra.parser.TableSchemaParser;
import org.apache.hadoop.zebra.parser.TableStorageParser;
import org.apache.hadoop.zebra.parser.ParseException;
import org.apache.hadoop.zebra.schema.ColumnType;

/**
 * Partition is a column group management class. Its responsibilities include
 * storing column groups, handle column groups spliting and stitching by
 * insertions and queries respectively.
 */
public class Partition {
  /**
   * Storage split types
   */
  public enum SplitType {
    NONE, RECORD, COLLECTION, MAP
  }

  /*
   * This class holds the column group information as generated by
   * TableStorageParser.
   */
  public class PartitionInfo {
    private Map<Schema.ColumnSchema, PartitionFieldInfo> fieldMap =
        new HashMap<Schema.ColumnSchema, PartitionFieldInfo>();
    private Map<String, HashSet<ColumnMappingEntry>> mColMap =
        new HashMap<String, HashSet<ColumnMappingEntry>>();
    private Schema mSchema;

    public PartitionInfo(Schema schema) {
      mSchema = schema;
    }
    
    public Map<String, HashSet<ColumnMappingEntry>> getColMap() {
        return mColMap;
    }

    /*
     * holds a mapping between a column in the table schema and its
     * corresponding (sub)column group index and the field index inside the
     * column group
     */
    class ColumnMappingEntry implements Comparable<ColumnMappingEntry>{
      private int cgIndex = -1, fieldIndex = -1;
      private Schema.ColumnSchema fs;
      private HashSet<String> keySet;

      public ColumnMappingEntry(int ri, int fi, Schema.ColumnSchema fs) {
        cgIndex = ri;
        fieldIndex = fi;
        this.fs = fs;
      }

      public ColumnMappingEntry() {
      }

      /**
       * add map keys
       * return false if any key already exists but no rollback!
       */
      public boolean addKeys(HashSet<String> keys, HashSet<String> columnKeySet)
      {
        if (keySet == null)
          keySet = new HashSet<String>();
        String key;
        for (Iterator<String> it = keys.iterator(); it.hasNext(); )
        {
          key = it.next();
          
          // if the key is used in another CG?
          if (!columnKeySet.add(key))
            return false;
          
          if (!keySet.add(key))
            return false;
        }
        return true;
      }

      public int getCGIndex() {
        return cgIndex;
      }

      public int getFieldIndex() {
        return fieldIndex;
      }

      public Schema.ColumnSchema getColumnSchema() {
        return fs;
      }

      public boolean invalid() {
        return (cgIndex == -1 && fieldIndex == -1);
      }
      
      public int compareTo(ColumnMappingEntry anotherEntry) {   	  
    	  int r = anotherEntry.getCGIndex();
    	  if (r != this.cgIndex)
    		  return this.cgIndex - r;
    	  else {
    		  int f = anotherEntry.getFieldIndex();
    		  return this.fieldIndex -f;
    	  }	
      }
      
      HashSet<String> getKeys() {
        return keySet;
      }
      

    }

    /**
     * This class holds the column group info for a (sub)column which is a unit
     * in a column group
     */
    class PartitionFieldInfo {
      private HashSet<PartitionInfo.ColumnMappingEntry> mSplitMaps =
          new HashSet<ColumnMappingEntry>();
      private ColumnMappingEntry mCGIndex = null;
      private String mCGName = null; // fully qualified name
      private HashSet<String> keySet = null;
      private SplitType stype = SplitType.NONE;
      private HashSet<String> splitChildren = new HashSet<String>();

      /**
       * set a MAP key split (sub)column
       * returns false if sanity check fails
       */
      boolean setKeyCGIndex(int ri, int fi, String name, Schema.ColumnSchema fs, HashSet<String> keys) {
        Partition.PartitionInfo.ColumnMappingEntry cme =
              new Partition.PartitionInfo.ColumnMappingEntry( ri, fi, fs);
        mSplitMaps.add(cme);
        // multiple map splits on one MAP column is allowed!
        if (keySet == null)
          keySet = new HashSet<String>();
        return cme.addKeys(keys, keySet);
      }

      /**
       * set a record field split (sub)column
       */
      boolean setCGIndex(int ri, int fi, String name, Schema.ColumnSchema fs) {
        if (mCGIndex != null) return false;
        mCGIndex = new Partition.PartitionInfo.ColumnMappingEntry(ri, fi, fs);
        mCGName = name;
        return true;
      }

      ColumnMappingEntry getCGIndex() {
        return mCGIndex;
      }

      String getCGName() {
        return mCGName;
      }

      /**
       * set the split type of a (sub)column
       */
      void setSplit(SplitType st, SplitType cst, String name, String childName, boolean splitChild) throws ParseException {
        if (st == stype)
        {
          // multiple MAP splits of a field and its children on different keys are ok
          if (st == SplitType.MAP || cst == SplitType.MAP)
            return;
        }
        if (splitChild)
        {
          if (stype != SplitType.NONE && splitChildren.isEmpty())
            throw new ParseException("Split on "+name+" is set at different levels.");
          splitChildren.add(childName);
        } else {
          if (splitChildren.contains(childName))
            throw new ParseException("Split on "+name+" is set at different levels.");
        }
        stype = st;
      }

      /*
       * creates a default "catch-all" column schema if necessary
       */
      void generateDefaultCGSchema(Schema.ColumnSchema fs0, Schema schema,
          int defaultCGIndex,
          Map<String, HashSet<PartitionInfo.ColumnMappingEntry>> colmap, String prefix,
          Map<Schema.ColumnSchema, PartitionFieldInfo> fmap)
          throws ParseException {
        if (schema == null) throw new AssertionError("Interal Logic Error.");
        String name = fs0.getName();
        if (prefix == null) prefix = name;
        else if (name != null && !name.isEmpty())
          prefix += "." + name;
        Schema.ColumnSchema fs;
        Schema schema0 = fs0.getSchema();
        for (int i = 0; i < schema0.getNumColumns(); i++) {
          fs = schema0.getColumn(i);
          PartitionFieldInfo pi;
          if ((pi = fmap.get(fs)) == null)
            fmap.put(fs, pi = new PartitionFieldInfo());

          /*
           * won't go down for MAP split because only one level MAP split is
           * supported now
           */
          if (pi.stype != SplitType.NONE && pi.stype != SplitType.MAP) {
            /* go to the lower level */
            pi.generateDefaultCGSchema(fs, schema, defaultCGIndex, colmap,
                prefix, fmap);
          }
          else if (pi.mCGIndex != null) {
            HashSet<ColumnMappingEntry> cms = mColMap.get(pi.mCGName);
            if (cms == null)
            {
              cms = new HashSet<ColumnMappingEntry>();
              colmap.put(pi.mCGName, cms);
            }
            cms.add(pi.mCGIndex);
            if (!pi.mSplitMaps.isEmpty()) {
              for (Iterator<ColumnMappingEntry> it = pi.mSplitMaps.iterator();
                   it.hasNext(); )
              {
                cms.add(it.next());
              }
            }
          } else {
            String name0 = fs.getName();
            HashSet<ColumnMappingEntry> cms = colmap.get(prefix+"."+name0);
            if (cms == null)
            {
              cms = new HashSet<ColumnMappingEntry>();
              colmap.put(prefix+"."+name0, cms);
            }
            pi.mCGIndex = new ColumnMappingEntry(defaultCGIndex,
                  schema.getNumColumns(), fs);
            pi.mCGName = prefix+"."+name0;
            cms.add(pi.mCGIndex);

            schema.add(new Schema.ColumnSchema(prefix + "." + name0,
                fs.getSchema(), fs.getType()));
            if (!pi.mSplitMaps.isEmpty()) {
              for (Iterator<ColumnMappingEntry> it = pi.mSplitMaps.iterator();
                   it.hasNext(); )
              {
                cms.add(it.next());
              }
            }
          }
        }
      }
    }

    /**
     * set a MAP key split (sub)column
     */
    public boolean setKeyCGIndex(Schema.ColumnSchema fs, int ri, int fi, String name, HashSet<String> keys) {
      PartitionFieldInfo pi;
      if ((pi = fieldMap.get(fs)) == null) {
        pi = new PartitionFieldInfo();
        fieldMap.put(fs, pi);
      }

      return pi.setKeyCGIndex(ri, fi, name, fs.getSchema().getColumn(0), keys);
    }

    /**
     * set a record field split (sub)column
     */
    public boolean setCGIndex(Schema.ColumnSchema fs, int ri, int fi, String name) {
      PartitionFieldInfo pi;
      if ((pi = fieldMap.get(fs)) == null) {
        pi = new PartitionFieldInfo();
        fieldMap.put(fs, pi);
      }
      return pi.setCGIndex(ri, fi, name, fs);
    }

    /**
     * set the split type of a (sub)column
     */
    void setSplit(Schema.ColumnSchema fs, SplitType st, SplitType cst, String name, String childName, boolean splitChild) throws ParseException {
      PartitionFieldInfo pi;
      if ((pi = fieldMap.get(fs)) == null) {
        pi = new PartitionFieldInfo();
        fieldMap.put(fs, pi);
      }
      pi.setSplit(st, cst, name, childName, splitChild);
    }

    /*
     * creates a default "catch-all" CG schema if necessary
     */
    public CGSchema generateDefaultCGSchema(String name, String compressor,
        String serializer, String owner, String group, 
        short perm, final int defaultCGIndex, String comparator) throws ParseException {
      Schema schema = new Schema();
      Schema.ColumnSchema fs;
      for (int i = 0; i < mSchema.getNumColumns(); i++) {
        fs = mSchema.getColumn(i);
        PartitionFieldInfo pi;
        if ((pi = fieldMap.get(fs)) == null)
          fieldMap.put(fs, pi = new PartitionFieldInfo());

        /*
         * won't go down for MAP split because only one level MAP split is
         * supported now
         */
        if (pi.stype != SplitType.NONE && pi.stype != SplitType.MAP)
        {
          /* go to the lower level */
          pi.generateDefaultCGSchema(fs, schema, defaultCGIndex, mColMap, null,
             fieldMap);
        } else if (pi.mCGIndex != null) {
          HashSet<ColumnMappingEntry> cms = mColMap.get(pi.mCGName);
          if (cms == null)
          {
            cms = new HashSet<ColumnMappingEntry>();
            mColMap.put(pi.mCGName, cms);
          }
          cms.add(pi.mCGIndex);
          for (Iterator<ColumnMappingEntry> it = pi.mSplitMaps.iterator();
               it.hasNext(); )
          {
            cms.add(it.next());
          }
        }
        else {
          HashSet<ColumnMappingEntry> cms = mColMap.get(fs.getName());
          if (cms == null)
          {
            cms = new HashSet<ColumnMappingEntry>();
            mColMap.put(fs.getName(), cms);
          }
          ColumnMappingEntry cme = new ColumnMappingEntry(defaultCGIndex,
                schema.getNumColumns(), fs);
          cms.add(cme);
          setCGIndex(fs, defaultCGIndex, schema.getNumColumns(), fs.getName());
          schema.add(fs);
          for (Iterator<ColumnMappingEntry> it = pi.mSplitMaps.iterator();
               it.hasNext(); )
          {
            cms.add(it.next());
          }
        }
      }
      CGSchema defaultSchema =
          (schema.getNumColumns() == 0 ? null : new CGSchema(schema, false, comparator, name, serializer, compressor, owner, group, perm));
      return defaultSchema;
    }

    /**
     * returns "hash key-to-(sub)column" map on a (sub)column which is MAP-split
     * across different hash keys
     */
    public HashSet<PartitionInfo.ColumnMappingEntry> getSplitMap(
        Schema.ColumnSchema fs) {
      PartitionFieldInfo pi;
      if ((pi = mPartitionInfo.fieldMap.get(fs)) == null) {
        pi = new PartitionFieldInfo();
        mPartitionInfo.fieldMap.put(fs, pi);
      }
      return pi.mSplitMaps;
    }
  }

  /*
   * This class records all column groups required and their corresponding users
   * of tuples and their projections in turn.
   */
  private class CGEntry {
    /* reference to needing partitioned columns */
    private ArrayList<PartitionedColumn> mSources = null; // reference to
    // needing partitioned
    // columns
    private ArrayList<String> mProjections = null; // projections
    private int mSize = 0; // number of users
    private int mCGIndex = -1;
    private Tuple mTuple = null;
    // map of a projection index to a set of interested hash keys
    private HashMap<Integer, HashSet<String>> mKeyMap;

    CGEntry(int cgindex) {
      mCGIndex = cgindex;
      mSources = new ArrayList<PartitionedColumn>();
      mProjections = new ArrayList<String>();
    }

    void addUser(Partition.PartitionedColumn src, String projection) {
      mSources.add(src);
      mProjections.add(projection);
      src.setProjIndex(mSize++);
    }

    void addUser(Partition.PartitionedColumn src, String projection, HashSet<String> keys) {
      mSources.add(src);
      mProjections.add(projection);
      if (mKeyMap == null) {
        mKeyMap = new HashMap<Integer, HashSet<String>>();
      }
      mKeyMap.put(mSize, keys);
      src.setProjIndex(mSize++);
    }

    void cleanup()
    {
      for (int i = 0;  i < mSources.size(); i++)
        mSources.get(i).cleanup();
    }

    void insert(final BytesWritable key) throws ExecException {
      for (int i = 0; i < mSize; i++) {
        PartitionedColumn mSource = mSources.get(i);
        ((Tuple) mTuple).set(mSource.getProjIndex(), mSource.getRecord());
      }
    }

    void read() throws ExecException {
      for (int i = 0; i < mSize; i++) {
        PartitionedColumn mSource = mSources.get(i);    	  
        mSource.setRecord(mTuple.get(mSource.getProjIndex()));
      }
    }

    void setSource(Tuple tuple) {
      mTuple = tuple;
    }

    int getCGIndex() {
      return mCGIndex;
    }

    /**
     * return untyped projection schema
     */
    String getProjection() throws ParseException {
      String result = new String();
      HashSet<String> keySet;
      for (int i = 0; i < mProjections.size(); i++) {
        if (i > 0) result += ",";
        result += mProjections.get(i);
        if (mKeyMap != null && (keySet = mKeyMap.get(i)) != null)
        {
          if (keySet.isEmpty())
            throw new AssertionError(
              "Internal Logical Error: Empty key map.");
          result += "#{";
          int j = 0;
          for (Iterator<String> it = keySet.iterator(); it.hasNext(); j++)
          {
            if (j > 0)
              result += "|";
            result += it.next();
          }
          result += "}";
        }
      }
      return result;
    }
  }

  /**
   * stitch and split execution class
   */
  private class PartitionedColumn {
    private ArrayList<PartitionedColumn> mChildren = null; // partitioned
    // children
    private int mChildrenLen = 0;
    private int mFieldIndex = -1; // field index in parent
    private int mProjIndex = -1; // field index in CG projection: only used by
    // a leaf
    private Partition.SplitType mSplitType = Partition.SplitType.NONE;
    private Object mTuple = null;
    private boolean mNeedTmpTuple;
    private HashSet<String> mKeys; // interested hash keys
    private PartitionedColumn parent = null;

    PartitionedColumn(int fi, boolean needTmpTuple)
        throws IOException {
      mFieldIndex = fi;
      mNeedTmpTuple = needTmpTuple;
    }

    PartitionedColumn(int fi, Partition.SplitType st,
        boolean needTmpTuple) throws IOException {
      this(fi, needTmpTuple);
      mSplitType = st;
    }

    void setKeys(HashSet<String> keys) {
      mKeys = keys;
    }

    private void setParent(PartitionedColumn parent) {
      this.parent = parent;
    }

    /**
     * stitch op
     */
    @SuppressWarnings("unchecked")
    void stitch() throws IOException {
      PartitionedColumn child;
      if (mSplitType == Partition.SplitType.NONE) {
        for (int i = 0; i < mChildrenLen; i++) {
          child = mChildren.get(i);
          ((Tuple) mTuple).set(child.mFieldIndex, child.getRecord());
        }
      }
      else {
        // stitch MAP-key partitioned hashes
        for (int i = 0; i < mChildrenLen; i++) {
          child = mChildren.get(i);
          // add the new (key,value) to the existing map
          ((Map<String, Object>) mTuple)
              .putAll((Map<String, Object>)child.getRecord());
        }
      }
    }

    /**
     * split op
     */
    @SuppressWarnings("unchecked")
    void split() throws IOException {
      PartitionedColumn child;
      if (mSplitType == SplitType.NONE) {
        // record split
        for (int i = 0; i < mChildrenLen; i++) {
          child = mChildren.get(i);
          child.setRecord(((Tuple) mTuple).get(child.mFieldIndex));
        }
      }
      else {
        // split MAP columns excluding the hashes already in split key CGs
        String key;

        // make a clone so the input MAP is intact after keys are yanked
        Object newmap = ((HashMap<String, Object>) mTuple).clone();
        mTuple = newmap;
        Map<String, Object> map_column =
            (Map<String, Object>) (mTuple);
        Map<String, Object> childMap;
        Object value;
        for (int i = 0; i < mChildrenLen; i++) {
          child = mChildren.get(i);
          childMap = (Map<String, Object>) child.getRecord();
          for (Iterator<String> it = child.mKeys.iterator(); it.hasNext();)
          {
            key = it.next();
            if ((value = map_column.get(key)) != null)
            {
              childMap.put(key, value);
              map_column.remove(key);
            }
          }
        }
      }
    }

    Object getRecord() {
      return mTuple;
    }

    void setRecord(Object t) {
      mTuple = t;
    }

    void addChild(PartitionedColumn child) {
      if (mChildren == null) mChildren = new ArrayList<PartitionedColumn>();
      mChildren.add(child);
      mChildrenLen++;
      child.setParent(this);
    }

    void cleanup() {
      if (parent != null) {
        parent.removeChild(this);
      }
      if (mNeedTmpTuple && mTuple != null)
        mTuple = null;
    }

    void removeChild(PartitionedColumn child)
    {
      for (int i = 0; i < mChildrenLen; i++)
      {
        if (mChildren.get(i) == child)
        {
          mChildren.remove(i);
          mChildrenLen--;
          i--;
        }
      }
    }

    void setProjIndex(int projindex) {
      mProjIndex = projindex;
    }

    int getProjIndex() {
      return mProjIndex;
    }

    void createTmpTuple() throws IOException {
      if (mNeedTmpTuple)
      {
        int size = (mChildrenLen > 0 ? mChildrenLen : 1);
        mTuple = TypesUtils.createTuple(size);
      }
    }

    /**
     * create maps if necessary
     */
    void createMap()
    {
      mTuple = new HashMap<String, Object>();
    }

    /**
     * clear map
     */
    @SuppressWarnings("unchecked")
    void clearMap()
    {
      ((Map)mTuple).clear();
    }
  }

  private HashMap<Integer, CGEntry> mCGs = null; // involved CGs
  private CGEntry[] mCGList = new CGEntry[0];
  private ArrayList<PartitionedColumn> mExecs = null; // stitches to be
  // performed in sequence:
  // called by LOAD
  private int mStitchSize = 0; // number of the stitches
  private int mSplitSize = 0; // number of the splits
  private Schema mSchema = null;
  private CGSchema[] mCGSchemas;
  private PartitionInfo mPartitionInfo;
  private Projection mProjection = null;
  private ArrayList<PartitionedColumn> mPCNeedTmpTuple = new ArrayList<PartitionedColumn>();
  private ArrayList<PartitionedColumn> mPCNeedMap = new ArrayList<PartitionedColumn>();
  private String comparator;
  private boolean mSorted;
  private SortInfo mSortInfo;

  /*
   * ctor used for LOAD
   */
  public Partition(Schema schema, Projection projection, String storage, String comparator)
      throws ParseException, IOException {
    mSchema = schema;
    TableStorageParser sparser =
        new TableStorageParser(new StringReader(storage), this, mSchema, comparator);
    mPartitionInfo = new PartitionInfo(schema);
    ArrayList<CGSchema> cgschemas = new ArrayList<CGSchema>();
    sparser.StorageSchema(cgschemas);
    mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
    mProjection = projection;
    Schema projSchema = projection.getProjectionSchema();
    int size = projSchema.getNumColumns();
    HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> cgindices;
    PartitionInfo.ColumnMappingEntry cgindex;
    mCGs = new HashMap<Integer, CGEntry>();
    Schema.ColumnSchema fs;
    CGEntry cgentry;
    Schema.ParsedName pname = new Schema.ParsedName();
    mExecs = new ArrayList<PartitionedColumn>();
    PartitionedColumn parCol, curCol = new PartitionedColumn(-1, false); // top
    // level:
    // target
    // of
    // stitch
    String name;
    HashSet<String> projectedKeys;
    for (int i = 0; i < size; i++) // depth-first
    {
      if (projSchema.getColumn(i) == null)
        continue;
      name = projSchema.getColumn(i).getName();
      pname.setName(name);
      projectedKeys = (projection.getKeys() == null ? null :
                       projection.getKeys().get(projSchema.getColumn(i)));
      cgindices = getColMapping(schema, name, pname, projectedKeys);
      if (cgindices != null) {
        // either needs split of a CG column or the projection is a CG proper
        fs = schema.getColumnSchema(name);
        if (getSplitMap(fs).isEmpty()) {
          if (cgindices.size() != 1)
            throw new AssertionError( "Internal Logical Error: one RECORD split is expected.");
          Set<Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>>> entrySet = cgindices.entrySet();
          Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>> mapentry;
          mapentry = entrySet.iterator().next();
          cgindex = mapentry.getKey();
          if (cgindex == null)
            throw new AssertionError( "Internal Logical Error: RECORD does not have a CG index.");
          cgentry = getCGEntry(cgindex.getCGIndex());
          parCol = new PartitionedColumn(i, false);
          cgentry.addUser(parCol, name);
          curCol.addChild(parCol);
        } else {
          // leaves are not added to exec!
          if (!getSplitMap(fs).isEmpty()) {
            // this subtype is MAP-split
            handleMapStitch(pname, curCol, fs, i,
                getCGIndex(fs).getFieldIndex(), cgindices);
          }
        }
      }
      else {
        // a composite column of CGs
        fs = schema.getColumnSchema(name);
        if (fs == null) continue;
        parCol = new PartitionedColumn(i, true);
        mPCNeedTmpTuple.add(parCol);
        buildStitch(fs, pname, parCol); // depth-first
        mExecs.add(parCol);
        mStitchSize++;
        curCol.addChild(parCol);
      }
    }
    for (int i = 0; i < mPCNeedTmpTuple.size(); i++)
      mPCNeedTmpTuple.get(i).createTmpTuple();
    mExecs.add(curCol);
    mStitchSize++;
  }

  /*
   * ctor used by STORE
   */
  public Partition(final String schema, final String storage, String comparator, String sortColumns)
        throws ParseException, IOException
  {
    TableSchemaParser parser = new TableSchemaParser(new StringReader(schema));
    mSchema = parser.RecordSchema(null);
    mSortInfo = SortInfo.parse(sortColumns, mSchema, comparator);
    mSorted = (mSortInfo != null && mSortInfo.size() > 0);
    this.comparator = (mSorted ? mSortInfo.getComparator() : "");
    storeConst(storage);
  }

  public Partition(String schema, final String storage, String comparator)
      throws ParseException, IOException
  {
    TableSchemaParser parser = new TableSchemaParser(new StringReader(schema));
    mSchema = parser.RecordSchema(null);
    this.comparator = comparator;
    storeConst(storage);
  }

  private void storeConst(final String storage)
      throws ParseException, IOException
  {
    mPartitionInfo = new PartitionInfo(mSchema);
    TableStorageParser sparser =
      new TableStorageParser(new StringReader(storage), this, mSchema, this.comparator);
    ArrayList<CGSchema> cgschemas = new ArrayList<CGSchema>();
    sparser.StorageSchema(cgschemas);
    mCGSchemas = cgschemas.toArray(new CGSchema[cgschemas.size()]);
    int size = mSchema.getNumColumns();
    PartitionInfo.ColumnMappingEntry cgindex;
    mCGs = new HashMap<Integer, CGEntry>();
    CGEntry cgentry;
    mExecs = new ArrayList<PartitionedColumn>();
    PartitionedColumn parCol, curCol = new PartitionedColumn(-1, false); // top
    // level:
    // target
    // of
    // stitch
    mExecs.add(curCol); // breadth-first
    mSplitSize++;
    Schema.ColumnSchema fs;
    for (int i = 0; i < size; i++) {
      fs = mSchema.getColumn(i);
      cgindex = getCGIndex(fs);
      if (cgindex != null) {
        // a CG field
        cgentry = getCGEntry(cgindex.getCGIndex());
        // leaves are not added to exec!
        if (getSplitMap(fs).isEmpty()) {
          parCol = new PartitionedColumn(i, false);
          cgentry.addUser(parCol, null); // null mean all schema
          parCol.setProjIndex(cgindex.getFieldIndex());
          curCol.addChild(parCol);
        } else {
          // this subtype is MAP-split
          // => need to add splits for all split keys
          handleMapSplit(curCol, fs, i, cgentry, cgindex.getFieldIndex());
        }
      }
      else {
        // a composite column of CGs
        parCol = new PartitionedColumn(i, false);
        mExecs.add(parCol); // breadth-first
        mSplitSize++;
        buildSplit(fs, parCol);
        curCol.addChild(parCol);
      }
    }
    
    for (int i = 0; i < mPCNeedTmpTuple.size(); i++)
      mPCNeedTmpTuple.get(i).createTmpTuple();

    for (int i = 0; i < mPCNeedMap.size(); i++)
      mPCNeedMap.get(i).createMap();
  }

  public SortInfo getSortInfo() {
    return mSortInfo;
  }

  public boolean isSorted() {
    return mSorted;
  }

  public String getComparator() {
    return comparator;
  }

  /**
   * returns table schema
   */
  public Schema getSchema() {
    return mSchema;
  }

  /*
   * returns the partition info created by the parser
   */
  public PartitionInfo getPartitionInfo() {
    return mPartitionInfo;
  }

  /*
   * returns all column group schemas
   */
  public CGSchema[] getCGSchemas() {
    return mCGSchemas;
  }

  /*
   * returns a particular column group schemas
   */
  public CGSchema getCGSchema(int index) {
    if (mCGSchemas == null) return null;
    return mCGSchemas[index];
  }

  /*
   * search from the most specific name until the least specific: if none found
   * return null; In addition, the fq name portion after the matched CG's fq
   * name is returned in pn. For MAP split columns, it returns the catch-all
   * CG not the CGs with specific keys if this is not a key projection; otherwise
   * a map of CG fields to projected keys is returned
   */
  private HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> getColMapping(Schema schema,
      String name, Schema.ParsedName pn, HashSet<String> projectedKeys) throws ParseException {
    Map<String, HashSet<PartitionInfo.ColumnMappingEntry>> colmap =
        mPartitionInfo.mColMap;
    int fromIndex = name.length() - 1, lastHashIndex, lastFieldIndex;

    HashSet<PartitionInfo.ColumnMappingEntry> results = colmap.get(name);
    HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> result = null;
    HashSet<String> keys;
    String ancestorName;
    boolean map = false;
    if (results != null)
    {
      if (projectedKeys != null)
      {
        pn.setDT(ColumnType.MAP);
        map = true;
      } else {
        pn.setDT(ColumnType.ANY);
        PartitionInfo.ColumnMappingEntry cme;
        for (Iterator<PartitionInfo.ColumnMappingEntry> it = results.iterator(); it.hasNext(); )
        {
          cme = it.next();
          if (cme.getKeys() == null)
          {
            // no specific keys are interested. Either a MAP-split without key or a RECORD-split is OK
            result = new HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>>();
            result.put(cme, null);
            return result;
          }
        }
        return null;
      }
    }

    while (results == null) {
      lastHashIndex = name.lastIndexOf('#', fromIndex);
      lastFieldIndex = name.lastIndexOf('.', fromIndex);
      if (lastHashIndex == -1 && lastFieldIndex == -1) break;
      else if (lastHashIndex == -1) {
        fromIndex = lastFieldIndex;
      }
      else if (lastFieldIndex == -1) {
        fromIndex = lastHashIndex;
        map = true;
      }
      else {
        if (lastHashIndex == lastFieldIndex - 1
            || lastFieldIndex == lastHashIndex - 1) break;
        fromIndex =
            (lastFieldIndex > lastHashIndex ? lastFieldIndex : lastHashIndex);
      }
      if (fromIndex <= 0) break;
      ancestorName = name.substring(0, fromIndex);
      results = colmap.get(ancestorName);
      fromIndex--;
    }
    if (results != null) {
      if (map)
      {
        // build a HashMap from ColumnGroupMappingEntry to a set of projected keys for MAP-split
        if (results.isEmpty())
          throw new AssertionError( "Internal Logical Error: split is expected.");
        PartitionInfo.ColumnMappingEntry thisCG, defaultCG = null;
        boolean found;
        String projectedKey;
        for (Iterator<String> projectedKeyIt = projectedKeys.iterator(); projectedKeyIt.hasNext();)
        {
          projectedKey = projectedKeyIt.next();
          found = false;
          thisCG = null;
          for (Iterator<PartitionInfo.ColumnMappingEntry> it = results.iterator();
              it.hasNext(); )
          {
            thisCG = it.next();
            keys = thisCG.getKeys();
            if (keys == null)
            {
              defaultCG = thisCG;
            } else {
              if (keys.contains(projectedKey))
              {
                found = true;
                break;
              }
            }
          }
          
          if (!found)
          {
            if (defaultCG == null)
              throw new AssertionError( "Internal Logical Error: default MAP split CG is missing.");

            thisCG =defaultCG;
          }
          if (result == null)
            result = new HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>>();
          if ((keys = result.get(thisCG)) == null)
          {
            keys = new HashSet<String>();
            result.put(thisCG, keys);
          }
          keys.add(projectedKey);
        }
        if (result == null)
          if (results.isEmpty())
           throw new AssertionError( "Internal Logical Error: Default MAP split column is missing.");
      } else {
        // a RECORD-split
        if (results.size() != 1)
         throw new AssertionError(
           "Internal Logical Error: A single split is expected.");
        result = new HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>>();
        result.put(results.iterator().next(), null);
        return result;
      }
      // discard the CG fq name
      fromIndex += 2;
      if (!map)
      {
        // no need to crawl down after encountering a MAP-split subcolumn
        pn.setName(name.substring(fromIndex));
        pn.parseName(results.iterator().next().getColumnSchema());
      }
    }
    return result;
  }

  /**
   * recursively build stitch executions
   * @throws IOException 
   */
  private void buildStitch(Schema.ColumnSchema fs, Schema.ParsedName pn,
      PartitionedColumn parent) throws ParseException, IOException {
    // depth-first traversal
    CGEntry cgentry = null;
    PartitionedColumn parCol;

    Schema schema = fs.getSchema();
    if (schema != null && schema.getNumColumns() > 0) {
      Schema.ColumnSchema child;
      pn.parseName(fs);
      Schema.ParsedName oripn = new Schema.ParsedName();
      for (int i = 0; i < schema.getNumColumns(); i++) {
        oripn.setName(new String(pn.getName()), pn.getDT());
        child = schema.getColumn(i);
        if (getCGIndex(child) == null) {
          // not a CG: go one level lower
          parCol = new PartitionedColumn(i, true);
          mPCNeedTmpTuple.add(parCol);
          buildStitch(child, oripn, parCol); // depth-first
          mExecs.add(parCol);
          mStitchSize++;
          parent.addChild(parCol);
        }
        else {
          if (getSplitMap(child).isEmpty()) {
            // this subtype is not MAP-split
            cgentry = getCGEntry(getCGIndex(child).getCGIndex());
            parCol = new PartitionedColumn(i, false);
            cgentry.addUser(parCol, getCGName(child));
            parent.addChild(parCol);
          }
          else {
            // this subtype is MAP-split
            Map<String, HashSet<PartitionInfo.ColumnMappingEntry>> colmap =
              mPartitionInfo.mColMap;
            HashSet<PartitionInfo.ColumnMappingEntry> msplits = colmap.get(getCGName(child));
            HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> cgindices;
            cgindices = new HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>>();
            PartitionInfo.ColumnMappingEntry cme;
            for (Iterator<PartitionInfo.ColumnMappingEntry> it = msplits.iterator(); it.hasNext(); )
            {
              cme = it.next();

              // all keys must be included
              if (cme.getKeys() == null)
                cgindices.put(cme, cme.getKeys());
            }
            handleMapStitch(oripn, parent, child, i, getCGIndex(
                child).getFieldIndex(), cgindices);
          }
        }
      }
    }
    else throw new AssertionError(
        "Internal Logical Error: Leaf type must have a CG ancestor or is CG itself.");
  }

  /**
   * build stitches on a MAP split (sub)column
   * @throws IOException 
   */
  private void handleMapStitch(Schema.ParsedName pn,
      PartitionedColumn parent, Schema.ColumnSchema child, int i,
      int fi, HashMap<PartitionInfo.ColumnMappingEntry, HashSet<String>> cgindices) throws IOException {
    CGEntry cgentry;
    if (pn.getDT() == ColumnType.ANY) {
      // this subtype is MAP split and the projection is on the whole MAP:
      // => need to add stitches for all split keys

      // first the map partitioned column that contain all non-key-partitioned
      // hashes
      if (cgindices == null || cgindices.size() != 1) {
        throw new AssertionError(
            "Internal Logical Error: Invalid map key size.");
      }
      
      Set<Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>>> entrySet = cgindices.entrySet();
      Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>> mapentry;
      mapentry = entrySet.iterator().next();
      if (mapentry.getValue() != null)
        throw new AssertionError( "Internal Logical Error: RECORD should not have a split key map.");
      
      cgentry = getCGEntry(getCGIndex(child).getCGIndex());

      PartitionedColumn mapParCol =
          new PartitionedColumn(i, Partition.SplitType.MAP, false);
      cgentry.addUser(mapParCol, getCGName(child));
      mExecs.add(mapParCol); // not a leaf : MAP stitch needed
      mStitchSize++;
      parent.addChild(mapParCol);

      // the map-key partitioned columns:
      HashSet<PartitionInfo.ColumnMappingEntry> splitMap =
          getSplitMap(child);
      PartitionInfo.ColumnMappingEntry cgindex;
      int index;
      HashSet<Integer> projectedCGs = new HashSet<Integer>(); 
      for (Iterator<PartitionInfo.ColumnMappingEntry> it = splitMap.iterator(); it.hasNext();) {
        cgindex = it.next();
        index = cgindex.getCGIndex();
        cgentry = getCGEntry(index);
        // if the CG is already included in this sub-column stitch, then no need
        // to add it as a separate stitch since all keys therein is already included
        if (!projectedCGs.contains(index))
        {
          PartitionedColumn parCol =
             new PartitionedColumn(0, false);
          // mPCNeedTmpTuple.add(parCol);
          cgentry.addUser(parCol, getCGName(child), cgindex.getKeys());
          mapParCol.addChild(parCol); // contribute to the non-key-partitioned
         // hashes
         projectedCGs.add(index);
        }
      }
    }
    else {
      // this sub-type is MAP split and the projection is on another key which
      // is not a split key:
      // => need to add a specific key stitch
      if (cgindices == null) {
        throw new AssertionError(
            "Internal Logical Error: MAP key set is empty.");
      }

      Set<Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>>> entrySet = cgindices.entrySet();
      HashSet<String> projectedKeys;
      Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>> mapentry;
      boolean needParent = (entrySet.size() > 1);
      boolean newParent = false;
      PartitionedColumn parCol;
      for (Iterator<Map.Entry<PartitionInfo.ColumnMappingEntry, HashSet<String>>> it = entrySet.iterator(); it.hasNext(); )
      {
        mapentry = it.next();
        projectedKeys = mapentry.getValue();
        cgentry = getCGEntry(mapentry.getKey().getCGIndex());
        if (needParent)
        {
          parCol = new PartitionedColumn(i, Partition.SplitType.MAP, false);
          mExecs.add(parCol); // not a leaf : MAP stitch needed
          mStitchSize++;
          parent.addChild(parCol);
          parent = parCol;
          needParent = false;
          newParent = true;
        } else {
          parCol = new PartitionedColumn(newParent ? 0 : i, false);
          parent.addChild(parCol);
        }
        cgentry.addUser(parCol, getCGName(child), projectedKeys);
      }
    }
  }

  private CGEntry getCGEntry(int cgindex)
  {
    CGEntry result = mCGs.get(cgindex);
    if (result == null)
    {
      result = new CGEntry(cgindex);
      mCGs.put(cgindex, result);
      
      // Constructing a collection of mCGs so that 
      // we don't have to do it again [performance]
      mCGList = (CGEntry[])mCGs.values().toArray(new CGEntry[mCGs.size()]);
    }
    return result;
  }

  /**
   * recursively build split executions
   * @throws IOException 
   */
  private void buildSplit(Schema.ColumnSchema fs, PartitionedColumn parent)
      throws ParseException, IOException {
    // depth-first traversal
    CGEntry cgentry;
    PartitionedColumn parCol;

    Schema schema = fs.getSchema();
    if (schema != null && schema.getNumColumns() > 0) {
      Schema.ColumnSchema child;
      for (int i = 0; i < schema.getNumColumns(); i++) {
        child = schema.getColumn(i);
        PartitionInfo.ColumnMappingEntry cgindex = getCGIndex(child);
        if (cgindex == null) {
          // not a CG: go one level lower
          parCol = new PartitionedColumn(i, false);
          mExecs.add(parCol); // breadth-first
          mSplitSize++;
          parent.addChild(parCol);
          buildSplit(child, parCol);
        }
        else {
          cgentry = getCGEntry(cgindex.getCGIndex());
          if (getSplitMap(child).isEmpty()) {
            // this subfield is not MAP-split
            parCol = new PartitionedColumn(i, false);
            cgentry.addUser(parCol, getCGName(child));
            parCol.setProjIndex(cgindex.getFieldIndex());
            parent.addChild(parCol);

          }
          else {
            // this subfield is MAP-split
            // => need to add splits for all split keys
            handleMapSplit(parent, child, i, cgentry, cgindex.getFieldIndex());
          }
        }
      }
    }
    else throw new AssertionError(
        "Internal Logical Error: Leaf type must have a CG ancestor or is CG itself.");
  }

  /**
   * build splits for MAP split (sub)columns
   * 
   * @throws IOException
   */
  private void handleMapSplit(PartitionedColumn parent,
      Schema.ColumnSchema child, int i, CGEntry cgentry, int childProjIndex) throws ParseException, IOException {
    // first the map partitioned column that contain all non-key-partitioned
    // hashes
    PartitionedColumn mapParCol =
        new PartitionedColumn(i, Partition.SplitType.MAP, false);
    cgentry.addUser(mapParCol, getCGName(child));
    mapParCol.setProjIndex(childProjIndex);
    mExecs.add(mapParCol); // not a leaf : MAP split needed
    mSplitSize++;
    parent.addChild(mapParCol);

    // the map-key partitioned columns:
    HashSet<PartitionInfo.ColumnMappingEntry> splitMaps = getSplitMap(child);
    PartitionInfo.ColumnMappingEntry cgindex;
    int index;
    HashSet<String> keySet;
    for (Iterator<PartitionInfo.ColumnMappingEntry> it = splitMaps.iterator(); it.hasNext();) {
      cgindex = it.next();
      index = cgindex.getCGIndex();
      cgentry = getCGEntry(index);
      keySet = cgindex.getKeys();
      PartitionedColumn parCol;
      parCol = new PartitionedColumn(0, false);
      parCol.setKeys(keySet);
      cgentry.addUser(parCol, getCGName(child), keySet);
      parCol.setProjIndex(cgindex.getFieldIndex());
      // contribute to the non-key-partitioned hashes
      mapParCol.addChild(parCol);
      mPCNeedMap.add(parCol); // children needs the tmp map
    }
  }

  /**
   * read in a tuple based on stitches
   */
  public void read(Tuple t) throws AssertionError, IOException, Exception {
    if (mStitchSize == 0 || mCGs == null || mCGList.length == 0)
      return;

    // dispatch
    mExecs.get(mStitchSize - 1).setRecord(t);

    //Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
    //Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
    //while (it.hasNext())
    //  it.next().getValue().read();

    // read in CG data
    for (int i = 0; i < mCGList.length; i++)
      mCGList[i].read();

    // dispatch
    // mExecs.get(mStitchSize - 1).setRecord(t);

    // start the stitch
    for (int i = 0; i < mStitchSize; i++)
      mExecs.get(i).stitch();

    return;
  }

  /**
   * insert a tuple after splits
   */
  public void insert(final BytesWritable key, final Tuple t)
      throws AssertionError, IOException, Exception {
    if (mSplitSize == 0 || mCGs == null || mCGList.length == 0)
      throw new AssertionError("Empty Column Group List!");

    // dispatch
    mExecs.get(0).setRecord(t);
    for (int i = 0; i < mPCNeedMap.size(); i++)
      mPCNeedMap.get(i).clearMap();

    for (int i = 0; i < mSplitSize; i++)
      mExecs.get(i).split();

    // insert CG data
    //Set<Map.Entry<Integer, CGEntry>> entrySet = mCGs.entrySet();
    //Iterator<Map.Entry<Integer, CGEntry>> it = entrySet.iterator();
    //while (it.hasNext())
    //  it.next().getValue().insert(key);
    
    for (int i = 0; i < mCGList.length; i++)
      mCGList[i].insert(key);
    
    return;
  }

  /**
   * sets the source tuple for the column group ops
   */
  public void setSource(Tuple[] tuples) throws ParseException {
    if (tuples.length < mCGs.size())
      throw new ParseException(
          "Internal Logical Error: Invalid number of column groups");
    for (int i = 0; i < tuples.length; i++) {
      CGEntry mCG = mCGs.get(i);
      if (mCG != null) {
        if (tuples[i] == null) {
          mCG.cleanup();
          mCGs.remove(i);
          // Constructing a collection of mCGs so that 
          // we don't have to do it again [performance]
          mCGList = (CGEntry[])mCGs.values().toArray(new CGEntry[mCGs.size()]);
        } else {
          mCG.setSource(tuples[i]);
        }
      }
    }
  }

  /**
   * returns projection schema for a particular column group
   */
  public String getProjection(int cgindex) throws ParseException {
    CGEntry cgentry = mCGs.get(cgindex);
    if (cgentry != null) return cgentry.getProjection();
    return null;
  }

  /**
   * returns table projection
   */
  public Projection getProjection() {
    return mProjection;
  }

  public HashSet<PartitionInfo.ColumnMappingEntry> getSplitMap(
      Schema.ColumnSchema fs) {
    return mPartitionInfo.getSplitMap(fs);
  }

  public CGSchema generateDefaultCGSchema(String name, String compressor, String serializer,
      String owner, String group, short perm, 
      final int defaultCGIndex, String comparator) throws ParseException {
    return mPartitionInfo.generateDefaultCGSchema(name, compressor, serializer,owner, group, perm,
        defaultCGIndex, comparator);
  }

  public void setSplit(Schema.ColumnSchema fs, SplitType st, SplitType cst, String name, String childName, boolean splitChild) throws ParseException {
    mPartitionInfo.setSplit(fs, st, cst, name, childName, splitChild);
  }

  public boolean setCGIndex(Schema.ColumnSchema fs, int ri, int fi, String name) {
    return mPartitionInfo.setCGIndex(fs, ri, fi, name);
  }

  PartitionInfo.ColumnMappingEntry getCGIndex(Schema.ColumnSchema fs) {
    PartitionInfo.PartitionFieldInfo pi;
    if ((pi = mPartitionInfo.fieldMap.get(fs)) != null) return pi.getCGIndex();
    return null;
  }

  /**
   * @param fs
   * @return fully qualified name for a CG column
   */
  String getCGName(Schema.ColumnSchema fs) {
    PartitionInfo.PartitionFieldInfo pi;
    if ((pi = mPartitionInfo.fieldMap.get(fs)) != null) return pi.getCGName();
    return null;
  }
  
  public boolean isCGNeeded(int i)
  {
    return mCGs.containsKey(i);
  }
}
